package net.courier.mq.provider.rabbitmq;

import java.io.IOException;

import net.courier.mq.ExchangeType;
import net.courier.mq.MessageSendClient;
import net.courier.mq.MessageSendClientPool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitMessageSendClientPool implements MessageSendClientPool, ShutdownListener, ReturnListener {
	private final Logger LOGGER = LoggerFactory.getLogger(RabbitMessageSendClientPool.class);
	private static final ExchangeType DEFAULT_EXCHANGE_TYPE = ExchangeType.DIRECT;

	private String hosts;
	private RabbitChannelFactory channelFactory;
	private String exchangeName;
	private ExchangeType exchangeType = DEFAULT_EXCHANGE_TYPE;
	private Channel channel;

	private final RabbitMessageSendClient client;

	public RabbitMessageSendClientPool() {
		client = new RabbitMessageSendClient();
		client.setChannel(channel);
	}

	@Override
	public MessageSendClient getClient() {
		connectChannel();
		return client;
	}

	@Override
	public void setExchangeType(ExchangeType exchangeType) {
		this.exchangeType = exchangeType;
	}

	@Override
	public void setExchangeName(String exchangeName) {
		this.exchangeName = exchangeName;
		client.setExchangeName(exchangeName);
	}

	@Override
	public void setHost(String hosts) {
		this.hosts = hosts;
	}
	
	@Override
	public synchronized void destroy() {
		channelFactory.destroy();
		channel = null;
	}

	public void setChannelFactory(RabbitChannelFactory channelFactory) {
		this.channelFactory = channelFactory;
	}

	public void setPollTimeout(long pollTimeout) {
		client.setPollTimeout(pollTimeout);
	}

	private RabbitChannelFactory createChannelFactory() {
		if (channelFactory != null) {
			return channelFactory;
		}

		RabbitConnectionFactory connectionFactory = new RabbitConnectionFactory();
		connectionFactory.setHosts(hosts);

		channelFactory = new RabbitChannelFactory();
		channelFactory.setConnectionFactory(connectionFactory);

		return channelFactory;

	}

	private void connectChannel() {
		if (channel == null || !channel.isOpen()) {
			try {
				channel = createChannelFactory().createChannel();
				channel.getConnection().addShutdownListener(this);
				channel.addReturnListener(this);
				channel.exchangeDeclare(exchangeName, exchangeType.toString());
				if (LOGGER.isInfoEnabled()) {
					LOGGER.info("已连接到exchange [" + exchangeName + "(" + exchangeType + ")]");
				}
			} catch (IOException e) {
				LOGGER.warn("无法连接到Channel", e);
			}
		}
	}
	
	@Override
	public void shutdownCompleted(ShutdownSignalException cause) {
		if (LOGGER.isInfoEnabled()) {
			LOGGER.info("Channel连接丢失，原因 [" + cause.getReason() + "]");
			LOGGER.info("Reference [" + cause.getReference() + "]");
		}

		if (cause.isInitiatedByApplication()) {
			if (LOGGER.isInfoEnabled()) {
				LOGGER.info("被应用关闭");
			}
		} else if (cause.isHardError()) {
			LOGGER.error("由于硬件问题关闭Channel，尝试进行重新连接...");
			connectChannel();
		}
	}

	@Override
	public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
			AMQP.BasicProperties properties, byte[] body) {
		LOGGER.warn("收到服务器反馈，返回码[" + replyCode + "]，返回消息[" + replyText + "]");
	}
}
